Skip to content

Conversation

@kibertoad
Copy link
Owner

@kibertoad kibertoad commented Dec 12, 2025

Summary by CodeRabbit

  • Breaking Changes

    • Handler APIs now require explicit messageType when registering/adding processed messages; schema inputs now use entry objects and resolution returns a result wrapper on error.
  • New Features

    • Pre-built message-type resolvers added for GCP Pub/Sub, SQS/EventBridge and GCS events.
  • Improvements

    • Message type resolution supports nested dot-paths; stronger validation for custom resolvers with multiple schemas; clearer error messages.
  • Documentation

    • Migration guidance and examples updated.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 12, 2025

Warning

Rate limit exceeded

@kibertoad has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 2 minutes and 1 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 7843189 and c1e2671.

📒 Files selected for processing (7)
  • packages/core/lib/queues/MessageTypeResolver.ts (6 hunks)
  • packages/core/test/queues/MessageTypeResolver.spec.ts (2 hunks)
  • packages/gcp-pubsub/README.md (1 hunks)
  • packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts (1 hunks)
  • packages/sqs/README.md (1 hunks)
  • packages/sqs/lib/index.ts (1 hunks)
  • packages/sqs/lib/utils/sqsMessageTypeResolvers.ts (1 hunks)

Walkthrough

Message-type resolution was refactored: schemas/definitions are now passed as entry objects with optional explicit messageType; messageTypePath supports dot-notation; custom resolvers require explicit messageType and cannot be used with multiple schemas; HandlerSpy.addProcessedMessage requires a messageType; resolver errors are returned as Either instead of throwing.

Changes

Cohort / File(s) Summary
Docs & Migration
UPGRADING.md, packages/core/README.md
Migration guidance updated for entry-based schemas/definitions, explicit messageType when using custom resolvers, dot-notation messageTypePath examples, and new resolveSchema result/error handling.
Message schema container
packages/core/lib/queues/MessageSchemaContainer.ts, packages/core/lib/index.ts
Added SchemaEntry and DefinitionEntry types; options now take { schema, messageType? } / { definition, messageType? }; added resolveSchemaMap/resolveDefinitionMap, multi-schema validation, explicit-type precedence, and resolver-error handling returning Either<Error, Schema>.
Message type resolver
packages/core/lib/queues/MessageTypeResolver.ts, packages/core/test/queues/MessageTypeResolver.spec.ts
messageTypePath supports dot-notation via dot-prop/path traversal; schema extraction updated to traverse nested shapes; error wording changed from "field"→"path".
Handler & registration
packages/core/lib/queues/HandlerContainer.ts, packages/core/test/queues/HandlerContainer.spec.ts
Error text clarified to indicate inability to determine type "at registration time"; explicit messageType required for handlers using custom resolvers; tests updated accordingly.
HandlerSpy API
UPGRADING.md
HandlerSpy.addProcessedMessage(...) now requires an explicit messageType parameter.
Abstract queue / publisher adapters
packages/core/lib/queues/AbstractQueueService.ts, packages/kafka/lib/AbstractKafkaPublisher.ts
Sites constructing MessageSchemaContainer now wrap schemas/definitions as entry objects ({ schema }, { schema, messageType }, { definition, messageType }); offload logic uses dot-prop get/set to preserve nested messageTypePath.
GCP & SQS resolvers
packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts, packages/sqs/lib/utils/sqsMessageTypeResolvers.ts, packages/gcp-pubsub/lib/index.ts, packages/sqs/lib/index.ts
Added reusable message-type resolver factories and concrete resolvers (CloudEvents, GCS, EventBridge) and re-exported them from packages.
Tests & config
packages/core/test/queues/MessageSchemaContainer.spec.ts, packages/core/test/queues/HandlerContainer.spec.ts, packages/core/vitest.config.ts, packages/amqp/test/utils/testContext.ts, packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts
New comprehensive MessageSchemaContainer and resolver tests; wording updates in handler tests; vitest coverage thresholds raised; test logger import adjusted; added payload-offloading test covering nested messageTypePath.
Package deps
packages/core/package.json
Added dependency dot-prop for nested path access.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Focus review on:
    • packages/core/lib/queues/MessageSchemaContainer.ts — mapping/validation for multiple schemas, precedence rules, and Either-based error handling.
    • packages/core/lib/queues/MessageTypeResolver.ts — dot-path traversal and schema-shape extraction correctness.
    • Call sites converting plain schema arrays to entry shapes (AbstractQueueService, Kafka publisher, other adapters).
    • Tests that assert new error messages and resolver behaviors (ensure messages and semantics match runtime code).

Possibly related PRs

Suggested reviewers

  • CarlosGamero
  • kjamrog

Poem

🐰 I hopped through dotted paths and literal clues,

I sniffed each schema, chased the type like news.
With explicit names and errors kept kind,
I leave tidy trails for whoever's behind. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'Handle edge cases' is vague and generic; it does not convey meaningful information about the substantial changes in the PR (messageType resolver refactoring, nested path support, API updates to MessageSchemaContainer). Use a more specific title that reflects the main changes, such as 'Refactor messageType resolution with support for nested paths and explicit type mappings' or 'Update MessageSchemaContainer API with SchemaEntry and DefinitionEntry wrapper types'.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/core/lib/queues/MessageTypeResolver.ts (1)

177-205: Validate extracted literal is a string; current cast bypasses type contract and allows non-string literals.

The function's return type explicitly states string | undefined, but the implementation uses field.value as string | undefined without validation. Since Zod v4 supports non-string literals (numbers, booleans, etc.), a schema with z.literal(123) would silently return a number, violating the return type contract and potentially causing type errors in routing maps.

   // Check if the field has a literal value (z.literal() creates a field with .value)
   if (!('value' in field)) {
     return undefined
   }

-  return field.value as string | undefined
+  const value = (field as { value?: unknown }).value
+  return typeof value === 'string' ? value : undefined
🧹 Nitpick comments (4)
UPGRADING.md (1)

46-51: Consider explicitly stating messageType in handler options is only required for custom resolver functions.
Right now the examples show messageType even in literal mode; that’s fine, but a one-liner clarification would reduce confusion.

packages/core/test/queues/MessageSchemaContainer.spec.ts (2)

25-38: Strengthen assertions: verify resolved schema matches the expected schema (not just result presence).

-      const resultA = container.resolveSchema({ type: 'message.a', payload: 'test' })
-      expect('result' in resultA).toBe(true)
+      const resultA = container.resolveSchema({ type: 'message.a', payload: 'test' })
+      expect(resultA).toEqual({ result: MESSAGE_SCHEMA_A })

-      const resultB = container.resolveSchema({ type: 'message.b', payload: 123 })
-      expect('result' in resultB).toBe(true)
+      const resultB = container.resolveSchema({ type: 'message.b', payload: 123 })
+      expect(resultB).toEqual({ result: MESSAGE_SCHEMA_B })

146-159: Clarify the “no literal type field” test comments (they currently contradict the expected error).
Right now it says “works fine” but asserts failure; I’d align the comments with the new “no DEFAULT fallback when resolver exists” behavior.

packages/core/lib/queues/MessageSchemaContainer.ts (1)

141-166: Consider making { literal } + multiple schemas fail with a clearer message than “Duplicate schema…”.
Right now it fails indirectly via duplicate key; an explicit check could improve UX.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ea77735 and cddb419.

📒 Files selected for processing (7)
  • UPGRADING.md (2 hunks)
  • packages/core/lib/queues/HandlerContainer.ts (1 hunks)
  • packages/core/lib/queues/MessageSchemaContainer.ts (4 hunks)
  • packages/core/lib/queues/MessageTypeResolver.ts (2 hunks)
  • packages/core/test/queues/HandlerContainer.spec.ts (3 hunks)
  • packages/core/test/queues/MessageSchemaContainer.spec.ts (1 hunks)
  • packages/core/vitest.config.ts (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/core/test/queues/MessageSchemaContainer.spec.ts (1)
packages/core/lib/queues/MessageSchemaContainer.ts (1)
  • MessageSchemaContainer (25-170)
packages/core/lib/queues/MessageSchemaContainer.ts (2)
packages/core/lib/queues/MessageTypeResolver.ts (4)
  • MessageTypeResolverConfig (97-117)
  • MessageTypeResolverContext (5-15)
  • resolveMessageType (154-175)
  • isMessageTypeResolverFnConfig (140-144)
packages/core/lib/queues/HandlerContainer.ts (1)
  • resolveMessageType (296-306)
🪛 LanguageTool
UPGRADING.md

[style] ~9-~9: This phrase is redundant. Consider using “outside”.
Context: ... if you're using HandlerSpy directly (outside of the built-in queue services), you'll ne...

(OUTSIDE_OF)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
  • GitHub Check: kafka (22.x) / build
  • GitHub Check: kafka (24.x) / build
🔇 Additional comments (6)
packages/core/vitest.config.ts (1)

18-23: Coverage threshold bump looks fine—just ensure it matches CI’s actual coverage reporting.

packages/core/lib/queues/HandlerContainer.ts (1)

361-366: Improved registration-time failure message (clearer action items).

packages/core/test/queues/HandlerContainer.spec.ts (2)

162-196: Rename to “messageTypePath resolver” matches current API and reduces legacy confusion.


282-318: Updated assertions for registration-time failure read correctly and match new guidance.

packages/core/lib/queues/MessageSchemaContainer.ts (2)

44-74: resolveSchema behavior is now consistent and safer (no-resolver default + resolver error → Either).


112-134: Registration-time validation for multi-schema + custom resolver is a good constraint.
This prevents “it registers but can’t be routed deterministically” setups.

Comment on lines 7 to 10
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.

- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Small wording nit: “outside of” → “outside” (avoids redundancy).

-- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
+- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
🧰 Tools
🪛 LanguageTool

[style] ~9-~9: This phrase is redundant. Consider using “outside”.
Context: ... if you're using HandlerSpy directly (outside of the built-in queue services), you'll ne...

(OUTSIDE_OF)

🤖 Prompt for AI Agents
In UPGRADING.md around lines 7 to 10, change the wording "outside of the
built-in queue services" to "outside the built-in queue services" to remove the
redundant "of"; update the sentence accordingly so it reads that callers using
HandlerSpy directly outside the built-in queue services need to update their
calls.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/core/lib/queues/AbstractQueueService.ts (1)

683-688: Fix message-type preservation for dot-notated messageTypePath during payload offloading.

The current code only works for root-level fields. When using a nested path like metadata.type, it writes a literal key "metadata.type" on the root object instead of creating the nested structure.

+import { getProperty, setProperty } from 'dot-prop'
@@
     if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) {
       const messageTypePath = this.messageTypeResolver.messageTypePath
-      // @ts-expect-error
-      result[messageTypePath] = message[messageTypePath]
+      const value = getProperty(message as unknown, messageTypePath)
+      if (value !== undefined) {
+        setProperty(result as unknown, messageTypePath, value)
+      }
     }

dot-prop is already a dependency in the package, and this approach aligns with how resolveMessageType handles nested paths.

♻️ Duplicate comments (1)
UPGRADING.md (1)

9-9: Small wording nit: "outside of" → "outside".

This was flagged in a previous review. Consider removing the redundant "of" for cleaner phrasing.

-- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
+- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
🧹 Nitpick comments (5)
packages/amqp/test/utils/testContext.ts (1)

90-110: Use a child logger or silent logger for test isolation. globalLogger is a singleton Pino instance shared across all tests. While Pino loggers are stateful and could be mutated by any test, you can prevent cross-test pollution by either creating a child logger (via Pino's .child() method) or configuring a silent logger for tests. If using globalLogger directly, consider wrapping it in a test-scoped child logger to ensure test isolation.

packages/core/test/queues/MessageTypeResolver.spec.ts (1)

70-118: Nice coverage for nested path resolution + schema extraction; error messaging assertions updated coherently.

Optional: add a case like messageData: 'oops' (or []) to ensure resolveMessageType fails deterministically with the same “path not found” error.

Also applies to: 238-292

packages/core/README.md (1)

91-101: Docs reflect dot-path support and the new schema-entry/resolveSchema result shape.

If resolveSchema() returns { error?: ...; result?: ... }, consider simplifying the example to if (result.error) { ... } else { ... } to avoid implying a strict discriminated union by key presence.

Also applies to: 540-552

packages/core/lib/queues/MessageSchemaContainer.ts (2)

180-186: Edge case: unclear error when type extraction fails for multiple schemas.

If messageTypePath is configured but multiple schemas lack literal values at that path, they'll all fall back to DEFAULT_SCHEMA_KEY, producing the error "Duplicate schema for type: Symbol(NO_MESSAGE_TYPE)". This message may confuse users.

Consider adding a more descriptive error when type extraction fails with multiple schemas:

      const key = type ?? DEFAULT_SCHEMA_KEY
-     if (result[key]) throw new Error(`Duplicate schema for type: ${key.toString()}`)
+     if (result[key]) {
+       const typeDesc = key === DEFAULT_SCHEMA_KEY
+         ? 'default (type could not be extracted from schema)'
+         : key.toString()
+       throw new Error(`Duplicate schema for type: ${typeDesc}. Ensure each schema has a unique literal value at the configured messageTypePath, or provide explicit messageType on each entry.`)
+     }

191-223: Consider validation consistency between schemas and definitions.

resolveSchemaMap validates multiple schemas (line 159), but resolveDefinitionMap doesn't perform similar validation. If multiple definitions are provided with a custom resolver function, they would all map to DEFAULT_SCHEMA_KEY and trigger a duplicate error.

If this is intentional (e.g., definitions have different usage patterns), a comment explaining why would be helpful. Otherwise, consider applying similar validation:

  private resolveDefinitionMap(
    entries: readonly DefinitionEntry[],
  ): Record<string | symbol, CommonEventDefinition> {
    const result: Record<string | symbol, CommonEventDefinition> = {}

+   // Note: Unlike schemas, definitions don't require validation for multiple entries
+   // because [explain rationale here]
+
    const literalType = this.getLiteralMessageType()

Or if validation should be added:

  private resolveDefinitionMap(
    entries: readonly DefinitionEntry[],
  ): Record<string | symbol, CommonEventDefinition> {
    const result: Record<string | symbol, CommonEventDefinition> = {}

+   this.validateMultipleSchemas(entries.length)
+
    const literalType = this.getLiteralMessageType()
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cddb419 and fca5f14.

📒 Files selected for processing (12)
  • UPGRADING.md (2 hunks)
  • packages/amqp/test/utils/testContext.ts (2 hunks)
  • packages/core/README.md (2 hunks)
  • packages/core/lib/index.ts (1 hunks)
  • packages/core/lib/queues/AbstractQueueService.ts (2 hunks)
  • packages/core/lib/queues/MessageSchemaContainer.ts (4 hunks)
  • packages/core/lib/queues/MessageTypeResolver.ts (6 hunks)
  • packages/core/package.json (1 hunks)
  • packages/core/test/queues/HandlerContainer.spec.ts (4 hunks)
  • packages/core/test/queues/MessageSchemaContainer.spec.ts (1 hunks)
  • packages/core/test/queues/MessageTypeResolver.spec.ts (2 hunks)
  • packages/kafka/lib/AbstractKafkaPublisher.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/core/test/queues/MessageSchemaContainer.spec.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/core/lib/queues/MessageSchemaContainer.ts (2)
packages/schemas/lib/events/eventTypes.ts (1)
  • CommonEventDefinition (22-33)
packages/core/lib/queues/MessageTypeResolver.ts (5)
  • MessageTypeResolverConfig (101-122)
  • MessageTypeResolverContext (7-17)
  • resolveMessageType (159-182)
  • isMessageTypeResolverFnConfig (145-149)
  • extractMessageTypeFromSchema (193-222)
🪛 LanguageTool
UPGRADING.md

[style] ~9-~9: This phrase is redundant. Consider using “outside”.
Context: ... if you're using HandlerSpy directly (outside of the built-in queue services), you'll ne...

(OUTSIDE_OF)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
  • GitHub Check: general (22.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/gcs-payload-store) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/gcs-payload-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/core) / build
  • GitHub Check: kafka (22.x) / build
  • GitHub Check: kafka (24.x) / build
🔇 Additional comments (12)
packages/amqp/test/utils/testContext.ts (1)

1-6: Import change looks fine; keep type-only imports to avoid runtime coupling.
type imports for CommonLogger / ErrorReporter / ErrorResolver are a good choice; only globalLogger is runtime-imported.

packages/kafka/lib/AbstractKafkaPublisher.ts (1)

46-51: Schema container wiring updated correctly for new { schema } entry shape.

packages/core/lib/index.ts (1)

75-80: Public re-exports look consistent with the new MessageSchemaContainer API.

packages/core/test/queues/HandlerContainer.spec.ts (1)

163-196: Updated tests match the new path-based resolver wording and clearer registration-time failure mode.

Also applies to: 283-317

packages/core/lib/queues/AbstractQueueService.ts (1)

179-190: Schema/definition entries now carry explicit messageType as intended; publisher wrapping is consistent.

Also applies to: 202-209

packages/core/package.json (1)

27-36: No compatibility concerns with dot-prop@^10.1.0.

The import style import { getProperty } from 'dot-prop' used in packages/core/lib/queues/MessageTypeResolver.ts is correctly aligned with dot-prop v10.1.0's ESM named exports. The package.json declares "type": "module" and targets @types/node@^24.10.1, well above the minimum Node version needed for stable ESM support. The dependency is ready to use.

UPGRADING.md (1)

42-52: Documentation updates look good.

The descriptions for messageTypePath dot notation support, the new SchemaEntry/DefinitionEntry API, custom resolver validation, and improved error handling are clear and provide helpful migration context.

packages/core/lib/queues/MessageSchemaContainer.ts (5)

14-30: Well-designed entry types with clear documentation.

The SchemaEntry and DefinitionEntry types cleanly encapsulate the schema/definition with optional explicit type mapping. The JSDoc comments correctly indicate when messageType is required.


49-53: Constructor initialization is correct.

The resolver is set before map resolution methods are called, ensuring this.messageTypeResolver is available. The nullish coalescing for messageDefinitions handles undefined gracefully.


62-92: Robust error handling with consistent Either pattern.

The method correctly:

  • Returns a clear error when no resolver is configured and no default schema exists
  • Wraps resolver exceptions in Either.error instead of throwing
  • Provides descriptive error messages for unsupported message types

98-106: Clean delegation to resolver utility.

The type assertion on line 103 is safe given the documented precondition that this method is only called after checking messageTypeResolver exists. The comment clarifies this assumption.


130-152: Validation logic correctly prevents misconfiguration.

The method properly enforces that:

  1. Multiple schemas require a messageTypeResolver
  2. Custom resolver functions cannot be used with multiple schemas (since type mapping cannot be determined at registration time)

Error messages are clear and guide users to valid configurations.

Comment on lines 201 to 222

const pathParts = messageTypePath.split('.')
// biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any
let current: any = schema

for (const part of pathParts) {
if (!current?.shape) {
return undefined
}
current = current.shape[part]
if (!current) {
return undefined
}
}

// Check if the final field has a literal value (z.literal() creates a field with .value)
if (!('value' in current)) {
return undefined
}

return current.value as string | undefined
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard extractMessageTypeFromSchema against primitives before using the in operator.

'value' in current throws if current is a primitive.

-  if (!('value' in current)) {
+  if (current === null || (typeof current !== 'object' && typeof current !== 'function')) {
+    return undefined
+  }
+  if (!('value' in current)) {
     return undefined
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const pathParts = messageTypePath.split('.')
// biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any
let current: any = schema
for (const part of pathParts) {
if (!current?.shape) {
return undefined
}
current = current.shape[part]
if (!current) {
return undefined
}
}
// Check if the final field has a literal value (z.literal() creates a field with .value)
if (!('value' in current)) {
return undefined
}
return current.value as string | undefined
}
const pathParts = messageTypePath.split('.')
// biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any
let current: any = schema
for (const part of pathParts) {
if (!current?.shape) {
return undefined
}
current = current.shape[part]
if (!current) {
return undefined
}
}
// Check if the final field has a literal value (z.literal() creates a field with .value)
if (current === null || (typeof current !== 'object' && typeof current !== 'function')) {
return undefined
}
if (!('value' in current)) {
return undefined
}
return current.value as string | undefined
}
🤖 Prompt for AI Agents
In packages/core/lib/queues/MessageTypeResolver.ts around lines 201 to 222, the
final check uses "'value' in current" which will throw if current is a
primitive; guard against primitives by first ensuring current is a non-null
object (e.g. current !== null && typeof current === 'object') before using the
in operator or hasOwnProperty, then return current.value as string | undefined
if present; update the conditional to perform that object check and only then
test for 'value' in current to avoid runtime errors.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
packages/sqs/lib/utils/sqsMessageTypeResolvers.ts (1)

97-126: Consider handling empty string detail-type.

The resolver checks for undefined and null but allows empty strings through. An empty string would pass the null check but may cause issues downstream if an empty message type is returned.

If empty strings should be treated as missing, consider:

-      if (detailType === undefined || detailType === null) {
+      if (detailType === undefined || detailType === null || detailType === '') {
packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts (1)

331-331: Consider initializing receivedMessage with a more descriptive initial state or moving inside the test.

The receivedMessage variable is declared outside the test but only used within it. This pattern works but could be cleaner by moving it inside the test function or using a wrapper object for clarity.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fca5f14 and 7843189.

📒 Files selected for processing (8)
  • packages/core/lib/queues/AbstractQueueService.ts (4 hunks)
  • packages/gcp-pubsub/lib/index.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts (1 hunks)
  • packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts (1 hunks)
  • packages/sqs/lib/index.ts (1 hunks)
  • packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts (1 hunks)
  • packages/sqs/lib/utils/sqsMessageTypeResolvers.ts (1 hunks)
  • packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
packages/sqs/lib/utils/sqsMessageTypeResolvers.ts (1)
packages/sqs/lib/index.ts (3)
  • EVENT_BRIDGE_DETAIL_TYPE_FIELD (42-42)
  • EVENT_BRIDGE_TYPE_RESOLVER (43-43)
  • createEventBridgeResolverWithMapping (41-41)
packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts (2)
packages/sqs/lib/index.ts (3)
  • EVENT_BRIDGE_TYPE_RESOLVER (43-43)
  • EVENT_BRIDGE_DETAIL_TYPE_FIELD (42-42)
  • createEventBridgeResolverWithMapping (41-41)
packages/sqs/lib/utils/sqsMessageTypeResolvers.ts (3)
  • EVENT_BRIDGE_TYPE_RESOLVER (59-61)
  • EVENT_BRIDGE_DETAIL_TYPE_FIELD (14-14)
  • createEventBridgeResolverWithMapping (97-126)
packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts (1)
packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts (8)
  • createAttributeResolver (65-78)
  • createAttributeResolverWithMapping (104-130)
  • CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER (153-154)
  • CLOUD_EVENTS_TYPE_ATTRIBUTE (23-23)
  • GCS_EVENT_TYPES (35-40)
  • GCS_NOTIFICATION_TYPE_RESOLVER (184-194)
  • GCS_EVENT_TYPE_ATTRIBUTE (29-29)
  • GCS_NOTIFICATION_RAW_TYPE_RESOLVER (218-219)
packages/core/lib/queues/AbstractQueueService.ts (1)
packages/core/lib/queues/MessageTypeResolver.ts (1)
  • isMessageTypePathConfig (127-131)
packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts (2)
packages/core/lib/payload-store/payloadStoreTypes.ts (1)
  • SinglePayloadStoreConfig (32-47)
packages/core/lib/queues/HandlerContainer.ts (1)
  • MessageHandlerConfigBuilder (135-225)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
  • GitHub Check: general (24.x, @message-queue-toolkit/gcs-payload-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/outbox-core) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/core) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
  • GitHub Check: kafka (24.x) / build
  • GitHub Check: kafka (22.x) / build
🔇 Additional comments (21)
packages/sqs/lib/utils/sqsMessageTypeResolvers.ts (1)

1-61: Well-documented EventBridge resolver configuration.

The constant and pre-built resolver are clearly documented with examples. The use of messageTypePath aligns with the core package's dot-prop path resolution support.

packages/core/lib/queues/AbstractQueueService.ts (4)

15-15: Appropriate use of dot-prop for nested path handling.

The dot-prop library is well-suited for reading/writing nested properties using dot-notation paths.


180-190: Schema container resolution now includes per-entry messageType.

The mapping correctly extracts both schema and messageType for each handler entry, and filters out undefined definitions before mapping. The non-null assertion on line 188 is properly guarded by the filter on line 185.


203-203: Publisher schema container updated to use object entries.

This aligns with the consumer schema container pattern, wrapping schemas in objects for consistency.


684-691: Nested path preservation for offloaded payloads looks correct.

The logic properly uses getProperty and setProperty from dot-prop to preserve the message type at nested paths when offloading. The undefined check ensures we don't set undefined values.

packages/sqs/lib/index.ts (1)

40-44: Clean public API extension for EventBridge resolvers.

The new exports are properly grouped and extend the SQS package's public API with EventBridge message type resolution utilities.

packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts (2)

260-290: Well-structured test setup for nested path offloading.

The test suite properly documents its purpose and sets up isolated infrastructure (bucket, queue) with appropriate lifecycle management.


292-405: Comprehensive test for nested messageTypePath preservation.

The test effectively verifies that:

  1. Large messages with nested type paths are correctly offloaded to S3
  2. The nested metadata.type field is preserved in the pointer message
  3. Consumers can correctly route and consume the offloaded message

The assertions at lines 388-400 validate both the full message content and the handler's received message.

packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts (3)

1-9: Clean test file setup with appropriate imports.

The imports correctly bring in resolveMessageType from core for integration testing and the local resolver utilities.


62-131: Comprehensive tests for createEventBridgeResolverWithMapping.

The test suite covers all key scenarios:

  • Successful type mapping
  • Error on unmapped type (without fallback)
  • Fallback to original value
  • Missing detail-type field
  • Null detail-type value

The error message assertions use partial matching (.toThrow("substring")), which is robust against minor message changes.


11-60: Good test coverage for EVENT_BRIDGE_TYPE_RESOLVER.

The tests cover:

  • Basic extraction from a full EventBridge event structure
  • Various detail-type value formats
  • Error case when the field is missing

The error message assertion is valid—Jest's toThrow() with a string performs substring matching, so the expected message "path 'detail-type' not found" will correctly match the actual error thrown by resolveMessageType ("Unable to resolve message type: path 'detail-type' not found in message data").

packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts (4)

1-41: LGTM! Well-documented constants.

The constant definitions are clear and well-organized. The documentation links to official GCP resources are helpful for maintainability.


65-78: LGTM! Clean implementation with proper error handling.

The function correctly extracts the message type from attributes and handles missing/null values appropriately. The explicit null check on line 70 is good defensive programming.


104-130: LGTM! Excellent error messaging.

The mapping logic is correct and the error message on line 126 helpfully lists available mappings, making debugging easier for users.


153-219: LGTM! Excellent documentation and API design.

The pre-built resolvers provide convenient defaults for common GCP scenarios. The documentation examples clearly demonstrate usage patterns, and the distinction between mapped and raw resolvers is well-articulated.

packages/gcp-pubsub/lib/index.ts (1)

11-11: LGTM! Correct public API exposure.

The new export properly exposes the GCP message type resolvers module and follows the existing export pattern.

packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts (5)

16-60: LGTM! Comprehensive edge case coverage.

The tests thoroughly cover happy path and all error scenarios (missing attribute, undefined attributes, null value). This ensures robust error handling.


62-117: LGTM! Thorough mapping behavior validation.

The tests verify all aspects of the mapping resolver: successful mapping, unmapped value errors, fallback behavior, and missing attribute errors. Well done.


119-143: LGTM! CloudEvents resolver validated.

The tests verify both successful extraction and error handling for the CloudEvents binary mode resolver. Good use of the constant in assertions for maintainability.


145-187: LGTM! Efficient and comprehensive GCS resolver tests.

The parameterized test approach on lines 154-165 efficiently validates all four GCS event type mappings while maintaining readability. Fallback and error cases are properly covered.


189-200: LGTM! Raw resolver behavior verified.

The test correctly validates that the raw resolver returns the GCS event type without any mapping transformations.

@kibertoad kibertoad merged commit d066ece into main Dec 12, 2025
37 checks passed
@kibertoad kibertoad deleted the fix/cleanup branch December 12, 2025 12:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants